package com.meta.api.flink.aggregate;

import org.apache.flink.table.functions.AggregateFunction;

// 布尔跳变持续算子
public class BooleanTrendContinueFunctions {

    // 算子名称
    public static final String BOOLTRENDCONTINUE = "BOOLTRENDCONTINUE";

    public static class ChooseHitAccum {
        // 结束计算
        public boolean flag = false;
        // 最后一个数据
        public Double lastValue = null;
        // 跳变
        public boolean re = false;
    }

    /**
     * // register function
     * StreamTableEnvironment tEnv = ...
     * tEnv.registerFunction("BOOLTRENDCONTINUE", new BooleanTrendContinueAvg());
     * // use function
     * tEnv.sqlQuery("SELECT user, BOOLTRENDCONTINUE(true|false, number) AS avgPoints FROM userScores GROUP BY user");
     */
    public static class BooleanTrendContinueAvg extends AggregateFunction<Boolean, ChooseHitAccum> {

        @Override
        public ChooseHitAccum createAccumulator() {
            return new ChooseHitAccum();
        }

        @Override
        public Boolean getValue(ChooseHitAccum acc) {
            return acc.re;
        }

        /**
         * 积累计算
         *
         * @param acc   对象
         * @param type  类型 true:0-1111 false:1-0000
         * @param value 数据
         */
        public void accumulate(ChooseHitAccum acc, boolean type, Double value) {
            // 排除第一条数据来统计
            if (acc.lastValue != null && !acc.flag) {
                if (type) {
                    //0-1111...
                    if (acc.lastValue == 0 && value == 1) {
                        acc.re = true;
                    } else if (acc.lastValue == 1 && value == 0) {
                        acc.re = false;
                        // 结束计算
                        acc.flag = true;
                    }
                } else {
                    //1-0000...
                    if (acc.lastValue == 1 && value == 0) {
                        acc.re = true;
                    } else if (acc.lastValue == 0 && value == 1) {
                        acc.re = false;
                        // 结束计算
                        acc.flag = true;
                    }
                }
            }
            // 缓存数据
            acc.lastValue = value;
        }

    }
}
